package kafka_go

import (
	sarama "gitee.com/tym_hmm/go-kafa-shopify-sarama"
	"time"
)

//消费消息上下文
//groupId string, topic string, partition int32, offset int64, message []byte, timeStamp time.Time, consumerVal *sarama.ConsumerMessage
type ConsumerMessageContext struct {
	builder         BuildConsumerApi
	groupId         string
	topic           string
	partition       int32
	offset          int64
	message         []byte
	timeStamp       time.Time
	val             *sarama.ConsumerMessage
	consumerSession *ConsumerSession
}

func (this *ConsumerMessageContext) reset() {
	this.builder = nil
	this.groupId = ""
	this.topic = ""
	this.partition = 0
	this.offset = 0
	this.message = nil
	this.timeStamp = time.Time{}
	this.val = nil
	this.consumerSession = nil
}

func (this *ConsumerMessageContext) GetBuilder() BuildConsumerApi {
	return this.builder
}
func (this *ConsumerMessageContext) GetGroupId() string {
	return this.groupId
}
func (this *ConsumerMessageContext) GetTopic() string {
	return this.topic
}
func (this *ConsumerMessageContext) GetPartition() int32 {
	return this.partition
}
func (this *ConsumerMessageContext) GetOffset() int64 {
	return this.offset
}
func (this *ConsumerMessageContext) GetMessage() []byte {
	return this.message
}
func (this *ConsumerMessageContext) GetMessageString() string {
	return string(this.message)
}

func (this *ConsumerMessageContext) GetTimeStamp() time.Time {
	return this.timeStamp
}
func (this *ConsumerMessageContext) GetVal() *sarama.ConsumerMessage {
	return this.val
}
func (this *ConsumerMessageContext) GetSession() *ConsumerSession {
	return this.consumerSession
}

type ConsumerSession struct {
	session   sarama.ConsumerGroupSession
	message   *sarama.ConsumerMessage
	isAutoAck bool
}

func NewConsumerSession(session sarama.ConsumerGroupSession, message *sarama.ConsumerMessage, isAutoAck bool) *ConsumerSession {
	return &ConsumerSession{
		session:   session,
		message:   message,
		isAutoAck: isAutoAck,
	}
}

func (this *ConsumerSession) Ack() {
	if this.session != nil && !this.isAutoAck {
		this.session.MarkMessage(this.message, "")
		this.session.Commit()
	}
}

func (this *ConsumerSession) IsAutoAck() bool {
	return this.isAutoAck
}

func (this *ConsumerSession) GetSession() sarama.ConsumerGroupSession {
	return this.session
}
func (this *ConsumerSession) GetMessage() *sarama.ConsumerMessage {
	return this.message
}
